{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 环境准备"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "Use one of the following command to start using pyalink:\n",
      "使用以下一条命令来开始使用 pyalink：\n",
      " - useLocalEnv(parallelism, flinkHome=None, config=None)\n",
      " - useRemoteEnv(host, port, parallelism, flinkHome=None, localIp=\"localhost\", config=None)\n",
      "Call resetEnv() to reset environment and switch to another.\n",
      "使用 resetEnv() 来重置运行环境，并切换到另一个。\n",
      "\n",
      "JVM listening on 127.0.0.1:51134\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "JavaObject id=o6"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# set env \n",
    "from pyalink.alink import *\n",
    "import sys, os\n",
    "resetEnv()\n",
    "useLocalEnv(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 数据准备"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# schema of train data\n",
    "schemaStr = \"id string, click string, dt string, C1 string, banner_pos int, site_id string, \\\n",
    "            site_domain string, site_category string, app_id string, app_domain string, \\\n",
    "            app_category string, device_id string, device_ip string, device_model string, \\\n",
    "            device_type string, device_conn_type string, C14 int, C15 int, C16 int, C17 int, \\\n",
    "            C18 int, C19 int, C20 int, C21 int\"\n",
    "\n",
    "# prepare batch train data\n",
    "batchTrainDataFn = \"http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-small.csv\"\n",
    "trainBatchData = CsvSourceBatchOp().setFilePath(batchTrainDataFn) \\\n",
    "        .setSchemaStr(schemaStr) \\\n",
    "        .setIgnoreFirstLine(True);\n",
    "# feature fit\n",
    "labelColName = \"click\"\n",
    "vecColName = \"vec\"\n",
    "numHashFeatures = 30000\n",
    "selectedColNames =[\"C1\",\"banner_pos\",\"site_category\",\"app_domain\",\n",
    "                  \"app_category\",\"device_type\",\"device_conn_type\", \n",
    "                  \"C14\",\"C15\",\"C16\",\"C17\",\"C18\",\"C19\",\"C20\",\"C21\",\n",
    "                   \"site_id\",\"site_domain\",\"device_id\",\"device_model\"]\n",
    "\n",
    "categoryColNames = [\"C1\",\"banner_pos\",\"site_category\",\"app_domain\", \n",
    "                    \"app_category\",\"device_type\",\"device_conn_type\",\n",
    "                    \"site_id\",\"site_domain\",\"device_id\",\"device_model\"]\n",
    "\n",
    "numericalColNames = [\"C14\",\"C15\",\"C16\",\"C17\",\"C18\",\"C19\",\"C20\",\"C21\"]\n",
    "\n",
    "# prepare stream train data\n",
    "wholeDataFile = \"http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv\"\n",
    "data = CsvSourceStreamOp() \\\n",
    "        .setFilePath(wholeDataFile) \\\n",
    "        .setSchemaStr(schemaStr) \\\n",
    "        .setIgnoreFirstLine(True);\n",
    "\n",
    "# split stream to train and eval data\n",
    "spliter = SplitStreamOp().setFraction(0.5).linkFrom(data)\n",
    "train_stream_data = spliter\n",
    "test_stream_data = spliter.getSideOutput(0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 在线学习五步骤\n",
    "<ul>\n",
    "    <li>步骤一、特征工程</li>\n",
    "    <li>步骤二、批式模型训练</li>\n",
    "    <li>步骤三、在线模型训练（FTRL）</li>\n",
    "    <li>步骤四、在线预测</li>\n",
    "    <li>步骤五、在线评估</li>\n",
    "</ul>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 步骤一、特征工程"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# setup feature enginerring pipeline\n",
    "feature_pipeline = Pipeline() \\\n",
    "        .add(StandardScaler() \\\n",
    "                .setSelectedCols(numericalColNames)) \\\n",
    "        .add(FeatureHasher() \\\n",
    "                .setSelectedCols(selectedColNames) \\\n",
    "                .setCategoricalCols(categoryColNames) \\\n",
    "                .setOutputCol(vecColName) \\\n",
    "                .setNumFeatures(numHashFeatures))\n",
    "\n",
    "# fit and save feature pipeline model\n",
    "FEATURE_PIPELINE_MODEL_FILE = os.path.join(os.getcwd(), \"feature_pipe_model.csv\")\n",
    "feature_pipeline.fit(trainBatchData).save(FEATURE_PIPELINE_MODEL_FILE);\n",
    "\n",
    "BatchOperator.execute();\n",
    "\n",
    "# load pipeline model\n",
    "feature_pipelineModel = PipelineModel.load(FEATURE_PIPELINE_MODEL_FILE);\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 步骤二、批式模型训练"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# train initial batch model\n",
    "lr = LogisticRegressionTrainBatchOp()\n",
    "initModel = lr.setVectorCol(vecColName) \\\n",
    "        .setLabelCol(labelColName) \\\n",
    "        .setWithIntercept(True) \\\n",
    "        .setMaxIter(10) \\\n",
    "        .linkFrom(feature_pipelineModel.transform(trainBatchData)) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 在线模型训练（FTRL）"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# ftrl train \n",
    "model = FtrlTrainStreamOp(initModel) \\\n",
    "        .setVectorCol(vecColName) \\\n",
    "        .setLabelCol(labelColName) \\\n",
    "        .setWithIntercept(True) \\\n",
    "        .setAlpha(0.1) \\\n",
    "        .setBeta(0.1) \\\n",
    "        .setL1(0.01) \\\n",
    "        .setL2(0.01) \\\n",
    "        .setTimeInterval(10) \\\n",
    "        .setVectorSize(numHashFeatures) \\\n",
    "        .linkFrom(feature_pipelineModel.transform(train_stream_data))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 在线预测"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'DataStream predResult: (Updated on 2019-12-05 15:03:33)'"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>click</th>\n",
       "      <th>pred</th>\n",
       "      <th>details</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.9046159047711626\",\"1\":\"0.0953840952288...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.7301554114492774\",\"1\":\"0.2698445885507...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.9354702479573089\",\"1\":\"0.0645297520426...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.7472443769874088\",\"1\":\"0.2527556230125...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.7313933609276811\",\"1\":\"0.2686066390723...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>5</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.7579078017993002\",\"1\":\"0.2420921982006...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>6</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.9658883764493819\",\"1\":\"0.0341116235506...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>7</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.8916428187684737\",\"1\":\"0.1083571812315...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>8</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.964470362868512\",\"1\":\"0.03552963713148...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>9</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.7879843998010425\",\"1\":\"0.2120156001989...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>10</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.7701207324521978\",\"1\":\"0.2298792675478...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>11</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.8816330561252186\",\"1\":\"0.1183669438747...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>12</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.8671197714269967\",\"1\":\"0.1328802285730...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>13</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.9355228418514457\",\"1\":\"0.0644771581485...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>14</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.9098863130943347\",\"1\":\"0.0901136869056...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>15</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.7917622336863489\",\"1\":\"0.2082377663136...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>16</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.8377318499121809\",\"1\":\"0.1622681500878...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>17</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.9647915025127575\",\"1\":\"0.0352084974872...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>18</th>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.7313985049080408\",\"1\":\"0.2686014950919...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>19</th>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "      <td>{\"0\":\"0.8541619467983884\",\"1\":\"0.1458380532016...</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   click pred                                            details\n",
       "0      0    0  {\"0\":\"0.9046159047711626\",\"1\":\"0.0953840952288...\n",
       "1      1    0  {\"0\":\"0.7301554114492774\",\"1\":\"0.2698445885507...\n",
       "2      0    0  {\"0\":\"0.9354702479573089\",\"1\":\"0.0645297520426...\n",
       "3      1    0  {\"0\":\"0.7472443769874088\",\"1\":\"0.2527556230125...\n",
       "4      0    0  {\"0\":\"0.7313933609276811\",\"1\":\"0.2686066390723...\n",
       "5      0    0  {\"0\":\"0.7579078017993002\",\"1\":\"0.2420921982006...\n",
       "6      0    0  {\"0\":\"0.9658883764493819\",\"1\":\"0.0341116235506...\n",
       "7      0    0  {\"0\":\"0.8916428187684737\",\"1\":\"0.1083571812315...\n",
       "8      0    0  {\"0\":\"0.964470362868512\",\"1\":\"0.03552963713148...\n",
       "9      0    0  {\"0\":\"0.7879843998010425\",\"1\":\"0.2120156001989...\n",
       "10     0    0  {\"0\":\"0.7701207324521978\",\"1\":\"0.2298792675478...\n",
       "11     0    0  {\"0\":\"0.8816330561252186\",\"1\":\"0.1183669438747...\n",
       "12     0    0  {\"0\":\"0.8671197714269967\",\"1\":\"0.1328802285730...\n",
       "13     0    0  {\"0\":\"0.9355228418514457\",\"1\":\"0.0644771581485...\n",
       "14     0    0  {\"0\":\"0.9098863130943347\",\"1\":\"0.0901136869056...\n",
       "15     0    0  {\"0\":\"0.7917622336863489\",\"1\":\"0.2082377663136...\n",
       "16     0    0  {\"0\":\"0.8377318499121809\",\"1\":\"0.1622681500878...\n",
       "17     0    0  {\"0\":\"0.9647915025127575\",\"1\":\"0.0352084974872...\n",
       "18     0    0  {\"0\":\"0.7313985049080408\",\"1\":\"0.2686014950919...\n",
       "19     1    0  {\"0\":\"0.8541619467983884\",\"1\":\"0.1458380532016..."
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# ftrl predict\n",
    "predResult = FtrlPredictStreamOp(initModel) \\\n",
    "        .setVectorCol(vecColName) \\\n",
    "        .setPredictionCol(\"pred\") \\\n",
    "        .setReservedCols([labelColName]) \\\n",
    "        .setPredictionDetailCol(\"details\") \\\n",
    "        .linkFrom(model, feature_pipelineModel.transform(test_stream_data))\n",
    "\n",
    "predResult.print(key=\"predResult\", refreshInterval = 30, maxLimit=20)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 在线评估"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'DataStream evaluation: (Updated on 2019-12-05 15:03:31)'"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>Statistics</th>\n",
       "      <th>Accuracy</th>\n",
       "      <th>AUC</th>\n",
       "      <th>ConfusionMatrix</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>all</td>\n",
       "      <td>0.8286096670786908</td>\n",
       "      <td>0.7182165258211499</td>\n",
       "      <td>[[5535,5007],[112297,561587]]</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>window</td>\n",
       "      <td>0.8464953470502861</td>\n",
       "      <td>0.7283501551891348</td>\n",
       "      <td>[[485,456],[8534,49090]]</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>all</td>\n",
       "      <td>0.830019475336848</td>\n",
       "      <td>0.7191075542108774</td>\n",
       "      <td>[[6020,5463],[120831,610677]]</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>window</td>\n",
       "      <td>0.8455799884444143</td>\n",
       "      <td>0.7227709897015594</td>\n",
       "      <td>[[512,416],[8671,49247]]</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>all</td>\n",
       "      <td>0.8311614455307001</td>\n",
       "      <td>0.719465721678977</td>\n",
       "      <td>[[6532,5879],[129502,659924]]</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>5</th>\n",
       "      <td>window</td>\n",
       "      <td>0.8444954128440367</td>\n",
       "      <td>0.7259189182276968</td>\n",
       "      <td>[[545,455],[8698,49162]]</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>6</th>\n",
       "      <td>all</td>\n",
       "      <td>0.8320733080282608</td>\n",
       "      <td>0.7199603254520217</td>\n",
       "      <td>[[7077,6334],[138200,709086]]</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "  Statistics            Accuracy                 AUC  \\\n",
       "0        all  0.8286096670786908  0.7182165258211499   \n",
       "1     window  0.8464953470502861  0.7283501551891348   \n",
       "2        all   0.830019475336848  0.7191075542108774   \n",
       "3     window  0.8455799884444143  0.7227709897015594   \n",
       "4        all  0.8311614455307001   0.719465721678977   \n",
       "5     window  0.8444954128440367  0.7259189182276968   \n",
       "6        all  0.8320733080282608  0.7199603254520217   \n",
       "\n",
       "                 ConfusionMatrix  \n",
       "0  [[5535,5007],[112297,561587]]  \n",
       "1       [[485,456],[8534,49090]]  \n",
       "2  [[6020,5463],[120831,610677]]  \n",
       "3       [[512,416],[8671,49247]]  \n",
       "4  [[6532,5879],[129502,659924]]  \n",
       "5       [[545,455],[8698,49162]]  \n",
       "6  [[7077,6334],[138200,709086]]  "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# ftrl eval\n",
    "EvalBinaryClassStreamOp() \\\n",
    "        .setLabelCol(labelColName) \\\n",
    "        .setPredictionCol(\"pred\") \\\n",
    "        .setPredictionDetailCol(\"details\") \\\n",
    "        .setTimeInterval(10) \\\n",
    "        .linkFrom(predResult) \\\n",
    "        .link(JsonValueStreamOp() \\\n",
    "                .setSelectedCol(\"Data\") \\\n",
    "                .setReservedCols([\"Statistics\"]) \\\n",
    "                .setOutputCols([\"Accuracy\", \"AUC\", \"ConfusionMatrix\"]) \\\n",
    "                .setJsonPath([\"$.Accuracy\", \"$.AUC\", \"$.ConfusionMatrix\"])) \\\n",
    "                .print(key=\"evaluation\", refreshInterval = 30, maxLimit=20)\n",
    "StreamOperator.execute();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
